package com.primeton.poctag.task;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.exec.ExecuteWatchdog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * <p>
 * commons of this class
 * </p>
 *
 * Created by zhaopx on 2018/1/24 0024-14:56
 * Vendor: primeton.com
 */
public class JobQueue {


    /**
     * jobId -> Job Detail
     */
    protected final static Map<String, JobEntity> taskId2Job = new ConcurrentHashMap();


    /**
     *
     * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
     *
     */
    protected final static Map<String, AppEntity> appId2Task = new ConcurrentHashMap();



    /**
     *
     * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
     *
     */
    protected final static Map<String, ExecuteWatchdog> TASKID_TO_TASKREF_MAP = new ConcurrentHashMap();




    protected static Logger LOG = LoggerFactory.getLogger(JobQueue.class);


    /**
     * 添加一个Task的 Observer 监听者
     * @param jobId Job ID
     * @param taskId TaskID
     * @param watchdog 监听者
     */
    public static void addTaskObserver(String jobId, String taskId, ExecuteWatchdog watchdog) {
        TASKID_TO_TASKREF_MAP.put(taskId, watchdog);
    }



    /**
     * 添加一个Task的 Observer 监听者
     * @param jobId Job ID
     * @param taskId TaskID
     */
    public static void removeTaskObserver(String jobId, String taskId) {
        TASKID_TO_TASKREF_MAP.remove(taskId);
    }



    /**
     * 添加一个Task的 Observer 监听者
     * @param jobId Job ID
     * @param taskId TaskID
     */
    public static ExecuteWatchdog getTaskObserver(String jobId, String taskId) {
        return TASKID_TO_TASKREF_MAP.get(taskId);
    }

    /**
     * 返回尚未运行的 Application
     * @return
     */
    public static Set<String> getBeforeRunning(){
        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
        for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
            if(entry.getValue().getState() == null) {
                continue;
            }

            // Yarn 任务在运行前的几个状态
            if("NEW|NEW_SAVING|SUBMITTED|ACCEPTED".contains(entry.getValue().getState())) {
                builder.add(entry.getKey());
            }
        }
        return builder.build();
    }



    /**
     * 返回正在运行的 Application
     * @return
     */
    public static Set<String> getRunningJob(){
        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
        for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
            if("RUNNING".equals(entry.getValue().getState())) {
                builder.add(entry.getKey());
            }
        }
        return builder.build();
    }




    /**
     * 返回正在运行的 Quartz 任务
     * @return
     */
    public static Set<JobEntity> getRunningJob0(){
        ImmutableSet.Builder<JobEntity> builder = ImmutableSet.builder();
        for (Map.Entry<String, JobEntity> entry : taskId2Job.entrySet()) {
            builder.add(entry.getValue());
        }
        return builder.build();
    }



    /**
     * 返回运行结束的 Application
     * @return
     */
    public static Set<String> getFinishedJob() {
        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
        for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
            if(entry.getValue().getState() == null) {
                continue;
            }
            if("FINISHED|FAILED|KILLED".contains(entry.getValue().getState())) {
                builder.add(entry.getKey());
            }
        }
        return builder.build();
    }


    /**
     * Spark Job 启动后推送到队列
     * @param jobId 任务ID
     * @param taskId taskID
     * @param map 任务参数
     */
    public static void pushNewTask(String jobId, String taskId, String uuid, JSONObject map) {
        JobEntity jobEntity = new JobEntity(jobId, taskId, uuid, map);
        taskId2Job.put(taskId, jobEntity);
    }


    /**
     * Yarn 反向推送到该接口，日志解析端推送
     * @param appId AppID
     * @param jobId JobID
     * @param taskId TASK
     * @param state 状态
     */
    public static void runningTask(String appId, String jobId, String taskId, String state) {
        AppEntity appEntity = appId2Task.get(appId);
        if(appEntity == null){
            appEntity = new AppEntity(appId);
        }

        appEntity.setJobId(jobId);
        appEntity.setTaskId(taskId);
        appEntity.setState(state);
        appId2Task.put(appId, appEntity);

        // 运行前， 回调
        JobEntity entity = taskId2Job.get(taskId);
        // Job 运行结束，一定会发送一次或多次，不用特别处理，运行前可能队列阻塞，启动迟缓，跟超时不同，需要回调
    }


    /**
     * 移除 Job，移除后不再做监控
     * @param appId Yarn AppId
     */
    public static void removeMonitorJob(String appId) {
        AppEntity appEntity = appId2Task.get(appId);
        if(appEntity == null){
            return;
        }

        JobEntity jobEntity = taskId2Job.get(appEntity.getTaskId());
        if(jobEntity == null){
            LOG.warn("{} 已完成，移除该任务。", appEntity.getTaskId());
            appId2Task.remove(appId);
            return;
        }

        // 移除Job
        appId2Task.remove(appId);
        taskId2Job.remove(appEntity.getTaskId());
        LOG.warn("{}/{} 已完成，移除该任务。", appEntity.getJobId(), appId);
    }


    /**
     * 删除任务
     * @param jobId 任务JobID
     * @param taskId 任务TaskID
     */
    public static void removeMonitorTask(String jobId, String taskId) {
        taskId2Job.remove(taskId);
        LOG.warn("{}/{} 已完成，移除该任务。", jobId, taskId);
    }

    /**
     * 移除任务
     * @param jobId
     * @param taskId
     */
    public static void removeErrorJob(String jobId, String taskId){
        taskId2Job.remove(taskId);
        LOG.warn("{}/{} 运行失败，从动态列表中移除。", jobId, taskId);
    }


    /**
     * 获取 App Job 信息
     * @param appId
     * @return
     */
    public static AppEntity getApp(String appId) {
        return appId2Task.get(appId);
    }

    public static class AppEntity {

        final String appId;

        String jobId;

        String taskId;

        String state;


        public AppEntity(String appId) {
            this.appId = appId;
        }


        public String getAppId() {
            return appId;
        }

        public String getJobId() {
            return jobId;
        }

        public void setJobId(String jobId) {
            this.jobId = jobId;
        }

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }

        public String getState() {
            return state;
        }

        public void setState(String state) {
            this.state = state;
        }


        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;

            AppEntity appEntity = (AppEntity) o;

            return appId != null ? appId.equals(appEntity.appId) : appEntity.appId == null;

        }

        @Override
        public int hashCode() {
            return appId != null ? appId.hashCode() : 0;
        }
    }



    public static class JobEntity {

        /**
         * Job ID
         */
        String jobId;

        /**
         * Task ID
         */
        String taskId;


        /**
         * UUID
         */
        String uuid;


        /**
         * Job 配置
         */
        JSONObject map;


        /**
         * 构造方法
         * @param jobId Job ID
         * @param uuid Task ID
         * @param map Job 配置
         */
        public JobEntity(String jobId, String taskId, String uuid, JSONObject map) {
            this.jobId = jobId;
            this.taskId = taskId;
            this.uuid = uuid;
            this.map = map;
        }


        public String getJobId() {
            return jobId;
        }


        public String getTaskId() {
            return taskId;
        }

        public String getUuid() {
            return uuid;
        }

        public JSONObject getMap() {
            return map;
        }


        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;

            JobEntity jobEntity = (JobEntity) o;

            return jobId != null ? jobId.equals(jobEntity.jobId) : jobEntity.jobId == null;

        }

        @Override
        public int hashCode() {
            return jobId != null ? jobId.hashCode() : 0;
        }
    }

}
